Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Cost Estimator Using Past Statistics for Schedule Generator #3156

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

Xiao-zhen-Liu
Copy link
Collaborator

@Xiao-zhen-Liu Xiao-zhen-Liu commented Dec 15, 2024

This PR introduces the CostEstimator trait which estimates the cost of a region, given some resource units.

  • The cost estimator is used by CostBasedScheduleGenerator to calculate the cost of a schedule during search.
  • Currently we only consider one type of schedule for each region plan, which is a total order of the regions. The cost of the schedule (and also the cost of the region plan) is thus the summation of the cost of each region.
  • The resource units are currently passed as placeholders because we assume a region will have all the resources when doing the estimation. The units may be used in the future if we consider different methods of schedule-generation. For example, if we allow two regions to run concurrently, the units will be split in half for each region.

A DefaultCostEstimator implementation is also added, which uses past execution statistics to estimate the wall-clock runtime of a region:

  • The runtime of each region is represented by the runtime of its longest-running operator.
  • The runtime of operators are estimated using the statistics from the latest successful execution of the workflow.
  • If such statistics do not exist (e.g., if it is the first execution, or if past executions all failed), we fall back to using number of materialized edges as the cost.
  • Added test cases using mock mysql data.

Copy link
Collaborator

@Yicong-Huang Yicong-Huang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Comment on lines +89 to +95
SqlServer
.getInstance(
StorageConfig.jdbcUrl,
StorageConfig.jdbcUsername,
StorageConfig.jdbcPassword
)
.createDSLContext()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bobbai00 is this the correct way to use connection pool?

Comment on lines +128 to +145
val eid = latestSuccessfulExecution.get.eId
val rawStats = context
.select(
WORKFLOW_RUNTIME_STATISTICS.OPERATOR_ID,
WORKFLOW_RUNTIME_STATISTICS.TIME,
WORKFLOW_RUNTIME_STATISTICS.DATA_PROCESSING_TIME,
WORKFLOW_RUNTIME_STATISTICS.CONTROL_PROCESSING_TIME
)
.from(WORKFLOW_RUNTIME_STATISTICS)
.where(
WORKFLOW_RUNTIME_STATISTICS.WORKFLOW_ID
.eq(widAsUInteger)
.and(WORKFLOW_RUNTIME_STATISTICS.EXECUTION_ID.eq(eid))
)
.orderBy(WORKFLOW_RUNTIME_STATISTICS.TIME, WORKFLOW_RUNTIME_STATISTICS.OPERATOR_ID)
.fetchInto(classOf[WorkflowRuntimeStatistics])
.asScala
.toList
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if possible, can we use one query to get the latest successful execution's stats? we can avoid the transaction.

Comment on lines +146 to +158
if (rawStats.isEmpty) {
None
} else {
val cumulatedStats = rawStats.foldLeft(Map.empty[String, Double]) { (acc, stat) =>
val opTotalExecutionTime = acc.getOrElse(stat.getOperatorId, 0.0)
acc + (stat.getOperatorId -> (opTotalExecutionTime + (stat.getDataProcessingTime
.doubleValue() + stat.getControlProcessingTime.doubleValue()) / 1e9))
}
Some(cumulatedStats)
}
} else {
None
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try to use early return or other techniques (e.g., pattern matching) to reduce branches.

Comment on lines +191 to +195
val region = Region(
id = RegionIdentity(0),
physicalOps = workflow.physicalPlan.operators,
physicalLinks = workflow.physicalPlan.links
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a test case where the region only contains a sub-dag of the workflow? just to make sure it can extract region costs correctly.

Comment on lines +140 to +144
val region = Region(
id = RegionIdentity(0),
physicalOps = workflow.physicalPlan.operators,
physicalLinks = workflow.physicalPlan.links
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a test case where the region only contains a sub-dag of the workflow? just to make sure it can extract region costs correctly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants